package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/segmentio/kafka-go"

	"errors"
)

const (
	// TODO test...
	t1Topic = "my-topic123"
	t2Topic = "my-topic456"
)

// Gracefully handle interrupt
var (
	ctx, cancelFunc = context.WithCancel(context.Background())
)

func init() {
	go func() {
		sig := make(chan os.Signal, 1)
		signal.Notify(sig, os.Interrupt)
		<-sig
		cancelFunc()
	}()
}

func main() {
	// TODO test...
	t1Broker := "127.0.0.1:9092"
	t2Broker := "127.0.0.1:9092"

	// TODO test...
	groupID := "g2"

	t1Reader := createReader(t1Broker, t1Topic, groupID)
	t2Reader := createReader(t2Broker, t2Topic, groupID)

	wg := &sync.WaitGroup{}
	wg.Add(2)

	go consumeMessages(wg, t1Reader)
	go consumeMessages(wg, t2Reader)

	// Handle interrupt to gracefully shutdown
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, os.Interrupt)

	<-sigchan

	log.Println("Shutting down...")
	cancelFunc()
	wg.Wait()
}

func createReader(broker, topic, groupID string) *kafka.Reader {
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:        []string{broker},
		Topic:          topic,
		GroupID:        groupID,
		MinBytes:       10e3, // 10KB
		MaxBytes:       10e6, // 10MB
		MaxWait:        500 * time.Millisecond,
		CommitInterval: 5 * time.Second,
	})
}

func consumeMessages(wg *sync.WaitGroup, reader *kafka.Reader) {

	// TODO 实际上 主协程停止了 最好给子协程发一个信号～～退出这个子协程～～
	defer func() {
		wg.Done()
	}()

	for {
		message, err := reader.ReadMessage(context.Background())
		if err != nil {
			if isTransientNetworkError(err) {
				log.Println("Network error, reconnecting...........")
				time.Sleep(5 * time.Second) // Wait before reconnecting
				continue
			}

			// TODO 其他情况 return，这也意味着不会再消费消息了～～～
			log.Printf("Error reading message: %v\n", err)
			return
		} else {
			fmt.Printf("Topic: %s, Partition: %d, Offset: %d, Value: %s\n",
				message.Topic, message.Partition, message.Offset, string(message.Value))
		}
	}
}

// Notice kafka-go源码 私有函数 挪过来
// TODO 实际中需要新增更多重试的情况～～
func isTransientNetworkError(err error) bool {
	return errors.Is(err, io.ErrUnexpectedEOF) ||
		errors.Is(err, syscall.ECONNREFUSED) ||
		errors.Is(err, syscall.ECONNRESET) ||
		errors.Is(err, syscall.EPIPE)
}
